/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.netty.buffer;


import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;

import io.netty.buffer.PoolArena.SizeClass;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Acts a Thread cache for allocations. This implementation is moduled after
 * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and
 * the
 * descripted technics of
 * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
 * Scalable memory allocation using jemalloc</a>.
 */
final class PoolThreadCache {

  private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

  final PoolArena<byte[]> heapArena;
  final PoolArena<ByteBuffer> directArena;

  // Hold the caches for the different size classes, which are tiny, small and normal.
  private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
  private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
  private final MemoryRegionCache<byte[]>[] normalHeapCaches;
  private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
  private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
  private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

  // Used for bitshifting when calculate the index of normal caches later
  private final int numShiftsNormalDirect;
  private final int numShiftsNormalHeap;
  private final int freeSweepAllocationThreshold;
  private final AtomicBoolean freed = new AtomicBoolean();

  private int allocations;

  // TODO: Test if adding padding helps under contention
  //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

  PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
      int tinyCacheSize, int smallCacheSize, int normalCacheSize,
      int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
    checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
    this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
    this.heapArena = heapArena;
    this.directArena = directArena;
    if (directArena != null) {
      tinySubPageDirectCaches = createSubPageCaches(
          tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
      smallSubPageDirectCaches = createSubPageCaches(
          smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

      numShiftsNormalDirect = log2(directArena.pageSize);
      normalDirectCaches = createNormalCaches(
          normalCacheSize, maxCachedBufferCapacity, directArena);

      directArena.numThreadCaches.getAndIncrement();
    } else {
      // No directArea is configured so just null out all caches
      tinySubPageDirectCaches = null;
      smallSubPageDirectCaches = null;
      normalDirectCaches = null;
      numShiftsNormalDirect = -1;
    }
    if (heapArena != null) {
      // Create the caches for the heap allocations
      tinySubPageHeapCaches = createSubPageCaches(
          tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
      smallSubPageHeapCaches = createSubPageCaches(
          smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

      numShiftsNormalHeap = log2(heapArena.pageSize);
      normalHeapCaches = createNormalCaches(
          normalCacheSize, maxCachedBufferCapacity, heapArena);

      // 这里numThreadCaches记录了每个Arena被多少个线程所持有
      heapArena.numThreadCaches.getAndIncrement();
    } else {
      // No heapArea is configured so just null out all caches
      tinySubPageHeapCaches = null;
      smallSubPageHeapCaches = null;
      normalHeapCaches = null;
      numShiftsNormalHeap = -1;
    }

    // Only check if there are caches in use.
    if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null
        || normalDirectCaches != null
        || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null
        || normalHeapCaches != null)
        && freeSweepAllocationThreshold < 1) {
      throw new IllegalArgumentException("freeSweepAllocationThreshold: "
          + freeSweepAllocationThreshold + " (expected: > 0)");
    }
  }

  private static <T> MemoryRegionCache<T>[] createSubPageCaches(
      int cacheSize, int numCaches, SizeClass sizeClass) {
    if (cacheSize > 0 && numCaches > 0) {
      @SuppressWarnings("unchecked")
      MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
      for (int i = 0; i < cache.length; i++) {
        // TODO: maybe use cacheSize / cache.length
        cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
      }
      return cache;
    } else {
      return null;
    }
  }

  private static <T> MemoryRegionCache<T>[] createNormalCaches(
      int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
    if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
      int min = Math.min(area.chunkSize, maxCachedBufferCapacity);  // 这里maxCachedBufferCapacity=32K，其指定了chunk类型的缓存最大的大小
      int arraySize = Math.max(1, log2(min / area.pageSize) + 1); // 这里arraySize = log2(32K / 8K) + 1 = 3，即数组长度为3

      @SuppressWarnings("unchecked")
      MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
      for (int i = 0; i < cache.length; i++) {
        // 这里cacheSize=64，每个NormalMemoryRegionCache中都维护了一个缓存队列，cacheSize指定的就是该队列的大小
        cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
      }
      return cache;
    } else {
      return null;
    }
  }

  private static int log2(int val) {
    int res = 0;
    while (val > 1) {
      val >>= 1;
      res++;
    }
    return res;
  }

  /**
   * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code
   * false} otherwise
   */
  boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    // 这里cacheForTiny方法将会返回目标容量对应的MemoryRegionCache对象
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
  }

  /**
   * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code
   * false} otherwise
   */
  boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity,
      int normCapacity) {
    return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
  }

  /**
   * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code
   * false} otherwise
   */
  boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity,
      int normCapacity) {
    return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
  }

  @SuppressWarnings({"unchecked", "rawtypes"})
  private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
    // 这里在初始化PoolThreadCache对象的时候会对相应的cache数组进行初始化，因而这里正常情况下是不会为空的
    if (cache == null) {
      // no cache found so just return false here
      return false;
    }

    boolean allocated = cache.allocate(buf, reqCapacity);
    if (++allocations >= freeSweepAllocationThreshold) {
      allocations = 0;
      trim();
    }
    return allocated;
  }

  /**
   * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room. Returns {@code
   * true} if it fit into the cache {@code false} otherwise.
   */
  @SuppressWarnings({"unchecked", "rawtypes"})
  boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
      long handle, int normCapacity, SizeClass sizeClass) {
    MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
    if (cache == null) {
      return false;
    }
    return cache.add(chunk, nioBuffer, handle);
  }

  private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
    switch (sizeClass) {
      case Normal:
        return cacheForNormal(area, normCapacity);
      case Small:
        return cacheForSmall(area, normCapacity);
      case Tiny:
        return cacheForTiny(area, normCapacity);
      default:
        throw new Error();
    }
  }

  /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
  @Override
  protected void finalize() throws Throwable {
    try {
      super.finalize();
    } finally {
      free();
    }
  }

  /**
   * Should be called if the Thread that uses this cache is about to exist to release resources out
   * of the cache
   */
  void free() {
    // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
    // we only call this one time.
    if (freed.compareAndSet(false, true)) {
      int numFreed = free(tinySubPageDirectCaches) +
          free(smallSubPageDirectCaches) +
          free(normalDirectCaches) +
          free(tinySubPageHeapCaches) +
          free(smallSubPageHeapCaches) +
          free(normalHeapCaches);

      if (numFreed > 0 && logger.isDebugEnabled()) {
        logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
            Thread.currentThread().getName());
      }

      if (directArena != null) {
        directArena.numThreadCaches.getAndDecrement();
      }

      if (heapArena != null) {
        heapArena.numThreadCaches.getAndDecrement();
      }
    }
  }

  private static int free(MemoryRegionCache<?>[] caches) {
    if (caches == null) {
      return 0;
    }

    int numFreed = 0;
    for (MemoryRegionCache<?> c : caches) {
      numFreed += free(c);
    }
    return numFreed;
  }

  private static int free(MemoryRegionCache<?> cache) {
    if (cache == null) {
      return 0;
    }
    return cache.free();
  }

  void trim() {
    trim(tinySubPageDirectCaches);
    trim(smallSubPageDirectCaches);
    trim(normalDirectCaches);
    trim(tinySubPageHeapCaches);
    trim(smallSubPageHeapCaches);
    trim(normalHeapCaches);
  }

  private static void trim(MemoryRegionCache<?>[] caches) {
    if (caches == null) {
      return;
    }
    for (MemoryRegionCache<?> c : caches) {
      trim(c);
    }
  }

  private static void trim(MemoryRegionCache<?> cache) {
    if (cache == null) {
      return;
    }
    cache.trim();
  }

  private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    // 将目标容量除以16，以便得到其需要使用多少个16字节，这里需要注意的是，idx是不会为0的，
    // 因为normCapacity必然是一个16的倍数
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
      // 这里tinySubPageDirectCaches的长度为32，每个元素内部维护的队列长度为512。
      // 需要注意的是，tinySubPageDirectCaches如何进行分层的在这里就可以明显看出来，这里cache()方法返回的就是
      // tinySubPageDirectCaches[idx]，也就是说数组的每个节点都是以16字节进行分层的，比如这里normCapacity是112，
      // idx计算得到7，那么这里返回的就是tinySubPageDirectCaches[7]，意思就是tinySubPageDirectCaches的数组下标
      // 为7的位置都是存储的字节长度为112的元素。因而我们可以总结出其进行分层的原理如下：
      // 下标    字节
      //  0      16
      //  1      32
      //  2      48
      //  3      64
      //  4      80
      //  .       .
      //  .       .
      //  .       .
      //  31     496
      // 从上面的分析我们可以看出，tinySubPageDirectCaches中存储的最大字节数是496，而不是512，
      // 而512则实际上是被算为small类型的字节了
      return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
  }

  private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.smallIdx(normCapacity);
    if (area.isDirect()) {
      // 这里smallSubPageDirectCaches的长度为4，每个元素内部维护的队列长度为256
      return cache(smallSubPageDirectCaches, idx);
    }
    return cache(smallSubPageHeapCaches, idx);
  }

  private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
    if (area.isDirect()) {
      int idx = log2(normCapacity >> numShiftsNormalDirect);
      // 这里normalDirectCaches的长度为3，每个元素内部维护的队列长度为64
      return cache(normalDirectCaches, idx);
    }
    int idx = log2(normCapacity >> numShiftsNormalHeap);
    return cache(normalHeapCaches, idx);
  }

  private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    if (cache == null || idx > cache.length - 1) {
      return null;
    }
    return cache[idx];
  }

  /**
   * Cache used for buffers which are backed by TINY or SMALL size.
   */
  private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {

    SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
      super(size, sizeClass);
    }

    @Override
    protected void initBuf(
        PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf,
        int reqCapacity) {
      chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
    }
  }

  /**
   * Cache used for buffers which are backed by NORMAL size.
   */
  private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {

    NormalMemoryRegionCache(int size) {
      super(size, SizeClass.Normal);
    }

    @Override
    protected void initBuf(
        PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf,
        int reqCapacity) {
      chunk.initBuf(buf, nioBuffer, handle, reqCapacity);
    }
  }

  private abstract static class MemoryRegionCache<T> {

    private final int size;
    private final Queue<Entry<T>> queue;
    private final SizeClass sizeClass;
    private int allocations;

    MemoryRegionCache(int size, SizeClass sizeClass) {
      this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
      queue = PlatformDependent.newFixedMpscQueue(this.size);
      this.sizeClass = sizeClass;
    }

    /**
     * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity
     * restrictions.
     */
    protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
        PooledByteBuf<T> buf, int reqCapacity);

    /**
     * Add to cache if not already full.
     */
    @SuppressWarnings("unchecked")
    public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
      Entry<T> entry = newEntry(chunk, nioBuffer, handle);
      boolean queued = queue.offer(entry);
      if (!queued) {
        // If it was not possible to cache the chunk, immediately recycle the entry
        entry.recycle();
      }

      return queued;
    }

    /**
     * Allocate something out of the cache if possible and remove the entry from the cache.
     */
    public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
      // 尝试从队列中获取元素，如果未做过缓存，则返回的值为null，表示从当前线程的缓存中获取失败
      Entry<T> entry = queue.poll();
      if (entry == null) {
        return false;
      }
      initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
      entry.recycle();

      // allocations is not thread-safe which is fine as this is only called from the same thread all time.
      ++allocations;
      return true;
    }

    /**
     * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
     */
    public final int free() {
      return free(Integer.MAX_VALUE);
    }

    private int free(int max) {
      int numFreed = 0;
      for (; numFreed < max; numFreed++) {
        Entry<T> entry = queue.poll();
        if (entry != null) {
          freeEntry(entry);
        } else {
          // all cleared
          return numFreed;
        }
      }
      return numFreed;
    }

    /**
     * Free up cached {@link PoolChunk}s if not allocated frequently enough.
     */
    public final void trim() {
      int free = size - allocations;
      allocations = 0;

      // We not even allocated all the number that are
      if (free > 0) {
        free(free);
      }
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    private void freeEntry(Entry entry) {
      PoolChunk chunk = entry.chunk;
      long handle = entry.handle;
      ByteBuffer nioBuffer = entry.nioBuffer;

      // recycle now so PoolChunk can be GC'ed.
      entry.recycle();

      chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer);
    }

    static final class Entry<T> {

      final Handle<Entry<?>> recyclerHandle;
      PoolChunk<T> chunk;
      ByteBuffer nioBuffer;
      long handle = -1;

      Entry(Handle<Entry<?>> recyclerHandle) {
        this.recyclerHandle = recyclerHandle;
      }

      void recycle() {
        chunk = null;
        nioBuffer = null;
        handle = -1;
        recyclerHandle.recycle(this);
      }
    }

    @SuppressWarnings("rawtypes")
    private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
      Entry entry = RECYCLER.get();
      entry.chunk = chunk;
      entry.nioBuffer = nioBuffer;
      entry.handle = handle;
      return entry;
    }

    @SuppressWarnings("rawtypes")
    private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
      @SuppressWarnings("unchecked")
      @Override
      protected Entry newObject(Handle<Entry> handle) {
        return new Entry(handle);
      }
    };
  }
}
